BT-06: Event Collectors
Goal: Implement the collector subsystem that reads security events from multiple sources and normalizes them into SecurityEvent objects.
Files:
- Create: /opt/security-blue-team/blueteam/collectors/base.py
- Create: /opt/security-blue-team/blueteam/collectors/db_audit.py
- Create: /opt/security-blue-team/blueteam/collectors/syslog_parser.py
- Create: /opt/security-blue-team/blueteam/collectors/nginx_log.py
- Create: /opt/security-blue-team/blueteam/collectors/php_error.py
- Create: /opt/security-blue-team/blueteam/collectors/redteam_report.py
- Create: /opt/security-blue-team/tests/test_collectors.py
Depends on: BT-05
Step 1: Write tests for collector base and DB collector
# tests/test_collectors.py
import pytest
from datetime import datetime
from blueteam.models import SecurityEvent
from blueteam.collectors.base import BaseCollector
def test_security_event_creation():
event = SecurityEvent(
timestamp=datetime.utcnow(),
source="test",
category="auth",
severity="high",
action="login_failed",
user_id="abc-123",
ip_address="192.168.1.1",
details={"reason": "invalid_password"},
nist_controls=["3.3.1", "3.5.2"],
)
assert event.source == "test"
assert event.severity == "high"
assert not event.cui_involved
def test_base_collector_is_abstract():
with pytest.raises(TypeError):
BaseCollector({})
Run: cd /opt/security-blue-team && python -m pytest tests/test_collectors.py -v
Expected: FAIL (classes don't exist yet)
Step 2: Implement base collector
# blueteam/collectors/base.py
"""Base collector interface."""
from abc import ABC, abstractmethod
from blueteam.models import SecurityEvent
class BaseCollector(ABC):
"""Abstract base for all event collectors."""
def __init__(self, config: dict):
self.config = config
self._last_position = None
@property
@abstractmethod
def name(self) -> str:
"""Collector identifier."""
...
@abstractmethod
def collect(self) -> list[SecurityEvent]:
"""Collect new events since last poll. Returns list of SecurityEvent."""
...
def is_enabled(self) -> bool:
"""Check if this collector is enabled in config."""
collectors = self.config.get("collectors", {})
return collectors.get(self.name, {}).get("enabled", False)
Step 3: Implement DB audit collector
This is the primary collector β reads from the audit_events table written by PHP AuditLogger.
# blueteam/collectors/db_audit.py
"""Database audit event collector β reads from audit_events table."""
from datetime import datetime, timezone
from blueteam.collectors.base import BaseCollector
from blueteam.models import SecurityEvent
from blueteam.db import get_connection
# Map audit categories to severity defaults
SEVERITY_MAP = {
("auth", "login", "failure"): "medium",
("auth", "login", "denied"): "high",
("auth", "login", "success"): "info",
("auth", "password_reset", "success"): "medium",
("auth", "impersonation_start", "success"): "high",
("access", "api_request", "denied"): "medium",
("admin", "user_delete", "success"): "high",
("admin", "role_change", "success"): "high",
("admin", "settings_change", "success"): "high",
("ai", "guardrail_triggered", "denied"): "high",
("system", "rate_limit_hit", "denied"): "medium",
("system", "error_500", "failure"): "medium",
}
# Map categories to NIST controls
NIST_MAP = {
"auth": ["3.3.1", "3.5.2"],
"access": ["3.3.1", "3.3.2", "3.1.1"],
"admin": ["3.3.1", "3.3.2", "3.1.7"],
"data": ["3.3.1", "3.1.3"],
"ai": ["3.3.1", "3.14.6"],
"system": ["3.3.1", "3.14.1"],
}
class DBAuditCollector(BaseCollector):
name = "db_audit"
def __init__(self, config: dict):
super().__init__(config)
self._last_event_id = None
self._last_timestamp = datetime.now(timezone.utc)
def collect(self) -> list[SecurityEvent]:
conn = get_connection(self.config)
with conn.cursor() as cur:
cur.execute("""
SELECT event_id, timestamp, category, action, result,
user_id, session_id, ip_address, user_agent,
resource_type, resource_id, instance_id,
cui_accessed, metadata
FROM audit_events
WHERE timestamp > %s
ORDER BY timestamp ASC
LIMIT 1000
""", (self._last_timestamp,))
rows = cur.fetchall()
events = []
for row in rows:
severity = SEVERITY_MAP.get(
(row["category"], row["action"], row["result"]),
"info" if row["result"] == "success" else "medium"
)
nist = NIST_MAP.get(row["category"], ["3.3.1"])
event = SecurityEvent(
timestamp=row["timestamp"],
source="audit_db",
category=row["category"],
severity=severity,
action=row["action"],
user_id=str(row["user_id"]) if row["user_id"] else None,
ip_address=str(row["ip_address"]) if row["ip_address"] else None,
details={
"result": row["result"],
"session_id": row["session_id"],
"user_agent": row["user_agent"],
"resource_type": row["resource_type"],
"resource_id": row["resource_id"],
"instance_id": str(row["instance_id"]) if row["instance_id"] else None,
**(row["metadata"] or {}),
},
nist_controls=nist,
cui_involved=row["cui_accessed"] or False,
event_id=str(row["event_id"]),
)
events.append(event)
self._last_timestamp = row["timestamp"]
return events
Step 4: Implement syslog parser
# blueteam/collectors/syslog_parser.py
"""Syslog collector β parses auth events from /var/log/syslog or auth.log."""
import re
from datetime import datetime, timezone
from pathlib import Path
from blueteam.collectors.base import BaseCollector
from blueteam.models import SecurityEvent
AUTH_PATTERNS = [
(r"eqmon-auth.*Forgot password rate limited.*email=(\S+)\s+ip=(\S+)",
"auth", "password_reset_rate_limited", "high", ["3.1.8", "3.3.1"]),
(r"eqmon-auth.*Password reset email sent.*email=(\S+)\s+user_id=(\S+)",
"auth", "password_reset_sent", "info", ["3.3.1", "3.5.9"]),
(r"eqmon-auth.*Password reset email FAILED.*email=(\S+)",
"system", "email_failure", "medium", ["3.3.4"]),
(r"eqmon-auth.*Password reset by (\S+) for user (\S+)",
"admin", "admin_password_reset", "high", ["3.3.1", "3.1.7"]),
(r"EQMON.*User deleted.*email=(\S+).*by=(\S+)",
"admin", "user_delete", "high", ["3.3.1", "3.1.7"]),
]
class SyslogCollector(BaseCollector):
name = "syslog"
def __init__(self, config: dict):
super().__init__(config)
self._file_pos = 0
self._path = Path(config.get("collectors", {}).get("syslog", {}).get("path", "/var/log/syslog"))
def collect(self) -> list[SecurityEvent]:
if not self._path.exists():
return []
events = []
try:
with open(self._path) as f:
f.seek(self._file_pos)
for line in f:
event = self._parse_line(line.strip())
if event:
events.append(event)
self._file_pos = f.tell()
except PermissionError:
pass
return events
def _parse_line(self, line: str):
for pattern, category, action, severity, nist in AUTH_PATTERNS:
match = re.search(pattern, line)
if match:
return SecurityEvent(
timestamp=datetime.now(timezone.utc),
source="syslog",
category=category,
severity=severity,
action=action,
details={"raw": line[:500], "groups": match.groups()},
nist_controls=nist,
)
return None
Step 5: Implement nginx log and PHP error collectors (similar pattern)
nginx_log.py β parse access log for 4xx/5xx responses, unusual request patterns. php_error.py β parse structured JSON error logs from error-handler.php.
Both follow the same BaseCollector pattern with file position tracking and regex parsing.
Step 6: Implement red team report collector
# blueteam/collectors/redteam_report.py
"""Red team report collector β imports attack results for posture scoring."""
import json
from datetime import datetime, timezone
from pathlib import Path
from blueteam.collectors.base import BaseCollector
from blueteam.models import SecurityEvent
class RedTeamCollector(BaseCollector):
name = "redteam"
def __init__(self, config: dict):
super().__init__(config)
reports_dir = config.get("collectors", {}).get("redteam", {}).get(
"reports_dir", "/opt/security-red-team/reports"
)
self._reports_dir = Path(reports_dir)
self._imported_files = set()
def collect(self) -> list[SecurityEvent]:
if not self._reports_dir.exists():
return []
events = []
for report_file in sorted(self._reports_dir.glob("*.json")):
if report_file.name in self._imported_files:
continue
self._imported_files.add(report_file.name)
try:
with open(report_file) as f:
report = json.load(f)
for attack in report.get("attacks", []):
for variant in attack.get("variants", []):
if variant.get("result") in ("vulnerable", "partial"):
events.append(SecurityEvent(
timestamp=datetime.now(timezone.utc),
source="redteam",
category="system",
severity=variant.get("severity", "medium").lower(),
action=f"redteam_{variant.get('result', 'unknown')}",
details={
"attack": attack.get("name"),
"variant": variant.get("name"),
"category": attack.get("category"),
"confidence": variant.get("confidence"),
"report_file": report_file.name,
},
nist_controls=variant.get("nist_controls", []),
))
except (json.JSONDecodeError, KeyError):
pass
return events
Step 7: Update collectors/init.py with registry
# blueteam/collectors/__init__.py
"""Collector registry."""
from blueteam.collectors.db_audit import DBAuditCollector
from blueteam.collectors.syslog_parser import SyslogCollector
from blueteam.collectors.nginx_log import NginxLogCollector
from blueteam.collectors.php_error import PHPErrorCollector
from blueteam.collectors.redteam_report import RedTeamCollector
ALL_COLLECTORS = [
DBAuditCollector,
SyslogCollector,
NginxLogCollector,
PHPErrorCollector,
RedTeamCollector,
]
def get_enabled_collectors(config: dict):
"""Return instances of all enabled collectors."""
collectors = []
for cls in ALL_COLLECTORS:
instance = cls(config)
if instance.is_enabled():
collectors.append(instance)
return collectors
Step 8: Run tests, commit
cd /opt/security-blue-team
python -m pytest tests/ -v
git add -A
git commit -m "feat: event collectors for audit DB, syslog, nginx, PHP errors, red team"